package com.atguigu.flink.datastreamapi.divide;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * Created by Smexy on 2023/11/13

    分流后数据类型和分流前的类型是一致的，就使用filter.
    分流后数据类型和分流前不一致，还要进行一些转换，就使用process.

 将s1类型的传感器在主流输出，s2和s3类型的传感器各在侧流输出
 */
public class Demo2_ProcessSideOutPutDivide
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        OutputTag<String> outputTagS2 = new OutputTag<>("s2", Types.STRING);
        OutputTag<String> outputTagS3 = new OutputTag<>("s3", Types.STRING);

        //返回主流对象
        SingleOutputStreamOperator<String> ds1 = (SingleOutputStreamOperator<String>) ds
            .process(new ProcessFunction<WaterSensor, String>()
            {
                @Override
                public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                    if ("s1".equals(value.getId())) {
                        //主流输出
                        out.collect(value.toString());
                    } else if ("s2".equals(value.getId())) {
                        /*
                            侧流输出
                                output(
                                OutputTag<X> outputTag: 告诉程序侧流中输出的数据类型的标记(给侧流起个名字，数据类型TypeInfomation)
                                 X value： 放入侧流的数据
                                 )
                         */
                        ctx.output(outputTagS2, value.toString());
                    } else if ("s3".equals(value.getId())) {
                        ctx.output(outputTagS3, value.toString());
                    }
                }
            });

        //主流使用黑色打印
        ds1.print();

        //获取侧流，使用主流获取侧流
        SideOutputDataStream<String> s2Stream = ds1.getSideOutput(outputTagS2);
        SideOutputDataStream<String> s3Stream = ds1.getSideOutput(outputTagS3);

        s2Stream.printToErr("s2");
        s3Stream.printToErr("s3");


        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
